import os
import re
import sys
import random
import getopt
import os.path
import subprocess
from datetime import datetime

import ma.commons.core.constants as constants
from ma.utils.clustersetup import *


master_log_filename = 'ma_master.log'
nodetracker_log_filename = 'ma_logger.log'
nohup_filename = 'nohup.out'

outfile_map_assigned_search_re = r"Assigned Map task (\d+) with job_id (\d+)(?:, struct-id (\S+))? to NT (\S+) \((\S+)\)"
egrep_map_assigned_re = r"Assigned Map task [[:digit:]]+ with job_id [[:digit:]]+"
outfile_red_assigned_search_re = r"Assigned Reduce task (\d+) with job_id (\d+)(?:, struct-id (\S+) level (\S+) no_inputs (\d+))? to NT (\S+) \((\S+)\)"
egrep_red_assigned_re = r"Assigned Reduce task [[:digit:]]+ with job_id [[:digit:]]+"

outfile_map_complete_search_re = r"Mark MAP complete called for (\d+)_(\d+)_\w job-id (\d+)(?:, struct-id (\S+) \| (\d+) reduce stubs created)?"
egrep_map_complete_re = r"Mark MAP complete called for [[:digit:]]+_[[:digit:]]+_\w job-id [[:digit:]]+"
outfile_red_complete_search_re = r"Mark REDUCE complete called for (\d+)_(\d+)_\w job-id (\d+)(?:, struct-id (\S+))?"
egrep_red_complete_re = r"Mark REDUCE complete called for [[:digit:]]+_[[:digit:]]+_\w job-id [[:digit:]]+"

logger_initial_line_re = r'(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+(\w+)\s+(.+?)\s+'


#################### FIND INCOMPLETE TASKS ####################

def get_re_list_from_catout(cat_filename, egrep_search_re, out_file_search_re):
    out_temp_filename = 'temp_' + '%.3f' % (random.random() * 10000)
    os.system('cat %s | egrep \'%s\' > %s' % (cat_filename, egrep_search_re, out_temp_filename))
    fd = open(out_temp_filename,'r')
    str = fd.read()
    fd.close()
    x = re.findall(out_file_search_re, str)
    os.remove(out_temp_filename)
    return x
 

def assigned_map_task_breakup(tpl):
    # returns job_id, task_id, struct_id, nodetracker_id, nodetracker_ip
    return tpl[1], tpl[0], tpl[2], tpl[3], tpl[4]

def assigned_reduce_task_breakup(tpl):
    # returns job_id, task_id, struct_id, level_no, no_inputs, nodetracker_id, nodetracker_ip
    return tpl[1], tpl[0], tpl[2], tpl[3], tpl[4], tpl[5], tpl[6]

def completed_map_task_breakup(tpl):
    # returns job_id, task_id, struct_id, no_new_reduces_made
    if not tpl[0] == tpl[2]: raise Error('The job-id is not the same in Complete Map statement: %s' % str(tpl)) 
    return tpl[0], tpl[1], tpl[3], tpl[4]

def completed_reduce_task_breakup(tpl):
    # returns job_id, task_id, struct_id
    if not tpl[0] == tpl[2]: raise Error('The job-id is not the same in Complete Reduce statement: %s' % str(tpl))
    return tpl[0], tpl[1], tpl[3]


def assigned_task_list_to_key(job_id, task_id, map_or_red, struct_id=""):
    return str(job_id) + '_' + str(map_or_red) + '_' + str(task_id) + '_' + str(struct_id) 
    
    
def find_incomplete_tasks():
    #---------------- MAPS ----------------#
    # get all the maps assigned 
    assigned_map_list = get_re_list_from_catout(master_log_filename, egrep_map_assigned_re, outfile_map_assigned_search_re)
    assigned_map_dict = {}
    for tpl in assigned_map_list:
        job_id, task_id, struct_id, nodetracker_id, nodetracker_ip = assigned_map_task_breakup(tpl)
        key_str = assigned_task_list_to_key(job_id, task_id, constants.MAP, struct_id)
        assigned_map_dict[key_str] = tpl
    
    # get all the maps completed
    completed_map_list = get_re_list_from_catout(master_log_filename, egrep_map_complete_re, outfile_map_complete_search_re)
    for tpl in completed_map_list:
        job_id, task_id, struct_id, no_new_reduces_made = completed_map_task_breakup(tpl)
        key_str = assigned_task_list_to_key(job_id, task_id, constants.MAP, struct_id)
        if key_str not in assigned_map_dict:
            raise Error('A map has been marked complete without being assigned: %s' % str(tpl))
        else:
            del assigned_map_dict[key_str]
                
    # print maps that have not been marked completed
    print('MAPS that have been assigned but not completed!')
    assigned_map_vals = list(assigned_map_dict.values())
    assigned_map_vals.sort(cmp=lambda x,y: cmp(int(x[0]), int(y[0])))
    for tpl in assigned_map_vals:
        job_id, task_id, struct_id, nodetracker_id, nodetracker_ip = assigned_map_task_breakup(tpl)
        print('job_id: %s, task_id: %s, struct_id: %s, nt_id: %s (%s)' % (job_id, task_id, struct_id, nodetracker_id, nodetracker_ip))
    
    
    #---------------- REDUCES ----------------#
    # get all the reduces assigned 
    assigned_red_list = get_re_list_from_catout(master_log_filename, egrep_red_assigned_re, outfile_red_assigned_search_re)
    assigned_red_dict = {}
    for tpl in assigned_red_list:
        job_id, task_id, struct_id, level_no, no_inputs, nodetracker_id, nodetracker_ip = assigned_reduce_task_breakup(tpl)
        key_str = assigned_task_list_to_key(job_id, task_id, 'R', struct_id)
        assigned_red_dict[key_str] = tpl
    
    # get all the reduces completed
    completed_red_list = get_re_list_from_catout(master_log_filename, egrep_red_complete_re, outfile_red_complete_search_re)
    for tpl in completed_red_list:
        job_id, task_id, struct_id = completed_reduce_task_breakup(tpl)
        key_str = assigned_task_list_to_key(job_id, task_id, 'R', struct_id)
        if key_str not in assigned_red_dict:
            raise Error('A reduce has been marked complete without being assigned: %s' % str(tpl))
        else:
            del assigned_red_dict[key_str]
                
    # print maps that have not been marked completed
    print('REDUCES that have been assigned but not completed!')
    assigned_red_vals = list(assigned_red_dict.values())
    assigned_red_vals.sort(cmp=lambda x,y: cmp(int(x[0]), int(y[0])))
    for tpl in assigned_red_vals:
        job_id, task_id, struct_id, level_no, no_inputs, nodetracker_id, nodetracker_ip = assigned_reduce_task_breakup(tpl)
        print('job_id: %s, task_id: %s, struct_id: %s, level_no: %s, no_inputs: %s, nt_id: %s (%s)' % (job_id, task_id, struct_id, level_no, no_inputs, nodetracker_id, nodetracker_ip))



#################### DEBUG TRACE TASKS ####################

def return_task_id(job_id, task_id, struct_id, m_or_r):
    return str(job_id) + str(m_or_r) + str(task_id)
    
def match_and_print_log_line_re(match_re, log_str):
    found_list = re.findall(logger_initial_line_re + match_re, log_str)
    return_list = []
    for tpl in found_list:
        return_list.append( (datetime.strptime(tpl[0], '%Y-%m-%d %H:%M:%S'), tpl[1], tpl[2], tpl[3:]) )
        
    if len(return_list) == 0:
        print('XXXXXXXXXX Cannot find the line to match the RE:', match_re, 'XXXXXXXXXX')
        return None
    for tpl in return_list:
        print('%s %-40s %s' % (tpl[0], tpl[2], tpl[3:]))
    return return_list

def match_and_print_nohup_line_re(match_re, log_str):
    found_list = re.findall(match_re, log_str)
    if len(found_list) == 0:
        print('XXXXXXXXXX Cannot find the line to match the RE:', match_re, 'XXXXXXXXXX')
        return None
    for tpl in found_list:
        print(' '*61 + str(tpl))
    return found_list

def read_process_output(cmd):
    proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
    text = proc.communicate()[0]
    proc.wait()
    return text

def sys_exit(nodetracker_log_temp, nohup_temp):
    if os.path.exists(nodetracker_log_temp):
        os.remove(nodetracker_log_temp)
    if os.path.exists(nohup_temp):
        os.remove(nohup_temp)
    sys.exit()
    
def copy_log_files(nodetracker_ip):
    print("The task was run at", nodetracker_ip)
    
    # download nohup.out and ma_logger.log from that machine
    nodetracker_log_temp = 'temp_' + '%.3f' % (random.random() * 10000)
    nohup_temp = 'temp_' + '%.3f' % (random.random() * 10000)
    
    if nodetracker_ip == MASTER_INTERNAL_IP:
        # if task run on the master node, then copy locally
        os.system('cp ' + nodetracker_log_filename + ' ' + nodetracker_log_temp)
        os.system('cp ' + nohup_filename + ' ' + nohup_temp)
    else:
        os.system('scp ' + BACK_END_ACCESS_USER + '@' + nodetracker_ip + ':' + REMOTE_RUN_PATH + nodetracker_log_filename + ' ' + nodetracker_log_temp)
        os.system('scp ' + BACK_END_ACCESS_USER + '@' + nodetracker_ip + ':' + REMOTE_RUN_PATH + nohup_filename + ' ' + nohup_temp)
    
    print('Copied log files')
    
    return (nodetracker_log_temp, nohup_temp)


def debug_trace_map_task(job_id, task_id, struct_id, nodetracker_id, nodetracker_ip, args):
    try:
        nodetracker_log_temp, nohup_temp = copy_log_files(nodetracker_ip)
        
        task_id_tag = return_task_id(job_id, task_id, struct_id, constants.MAP)
        
        # read the nodetracker log
        fd = open(nodetracker_log_temp, 'r')
        nt_str = fd.read()
        fd.close()
        
        match_re = r'(A Map task %s initiated with Job Id %s)[\n\r]' % (str(task_id), str(job_id))
        if not match_and_print_log_line_re(match_re, nt_str): sys_exit(nodetracker_log_temp, nohup_temp)
        match_re = r'(ProcessRunner %s init complete)[\n\r]' % task_id_tag
        if not match_and_print_log_line_re(match_re, nt_str): sys_exit(nodetracker_log_temp, nohup_temp)
        match_re = r'(Started MapTIP for %s)[\n\r]' % task_id_tag
        if not match_and_print_log_line_re(match_re, nt_str): sys_exit(nodetracker_log_temp, nohup_temp)
        match_re = r'(Copying map input for %s:\s+)(.+?)[\n\r]' % task_id_tag
        inp_file_list = [i[3][1] for i in match_and_print_log_line_re(match_re, nt_str)]
        if len(inp_file_list) == 0: sys_exit(nodetracker_log_temp, nohup_temp)
        
        dest_input_path = ''
        
        # iterate through all files to check if they are present
        for inp_file in inp_file_list:
            filepath, filename = os.path.split(inp_file)
            match_re = r"(Executing HDFS command: )\['.+?', '.+?', ('.+?'), '%s', '(.+?)'\][\n\r]" % inp_file
            output_paths = [i[3][2] for i in match_and_print_log_line_re(match_re, nt_str)]
            if len(output_paths) == 0: sys_exit(nodetracker_log_temp, nohup_temp)
            match_re = r"(Copied HDFS file %s to a local path) %s[\n\r]" % (inp_file, output_paths[0])
            if not match_and_print_log_line_re(match_re, nt_str): sys_exit(nodetracker_log_temp, nohup_temp)
            dest_input_path = output_paths[0]
    
        match_re = r"(Copied all map inputs for %s: )(.+?)[\n\r]" % task_id_tag
        if not match_and_print_log_line_re(match_re, nt_str): sys_exit(nodetracker_log_temp, nohup_temp)
        match_re = r"(Child process %s running: )(\d+)[\n\r]" % task_id_tag
        out = match_and_print_log_line_re(match_re, nt_str)
        if len(out) == 0: sys_exit(nodetracker_log_temp, nohup_temp)
        process_id = out[0][3][1]
        process_id_task_runner = process_id
        
        # read the nohup file
        fd = open(nohup_temp, 'r')
        nohup_str = fd.read()
        fd.close()
        
        match_re = r"[\n\r](Running )(\w+)( process pid %s)[\n\r]" % process_id_task_runner
        out = match_and_print_nohup_line_re(match_re, nohup_str)
        
        if not out:
            print("-> Not found maptaskrunner with the given process pid: Trying alternate method")
            match_re = r"[\n\r](\w+)( received all args for %s process pid )(\d+)[\n\r]" % task_id_tag
            out = match_and_print_nohup_line_re(match_re, nohup_str)
            if not out: sys_exit(nodetracker_log_temp, nohup_temp)
            process_id_task_runner = out[0][2]
            match_re = r"[\n\r](Running )(\w+)( process pid %s)[\n\r]" % process_id_task_runner
            out = match_and_print_nohup_line_re(match_re, nohup_str)
            
        maptaskrunner_name = out[0][1]
        match_re = r"[\n\r](%s received all args for %s process pid %s)[\n\r]" % (maptaskrunner_name, task_id_tag, process_id_task_runner)
        if not match_and_print_nohup_line_re(match_re, nohup_str): sys_exit(nodetracker_log_temp, nohup_temp)
        match_re = r"(TaskRunner inited %s)[\n\r]" % task_id_tag
        if not match_and_print_log_line_re(match_re, nohup_str): sys_exit(nodetracker_log_temp, nohup_temp)
        match_re = r"(Actual processing on map %s STARTED in a separate PROCESS)[\n\r]" % task_id_tag
        if not match_and_print_log_line_re(match_re, nohup_str): sys_exit(nodetracker_log_temp, nohup_temp)
        
        # iterate through all files and see they were read
        for inp_file in inp_file_list:
            filepath, filename = os.path.split(inp_file)
            inp_path = os.path.join(dest_input_path, filename)
            match_re = "(Input file to map %s: %s)[\n\r]" % (task_id_tag, inp_path)
            if not match_and_print_log_line_re(match_re, nohup_str): sys_exit(nodetracker_log_temp, nohup_temp)
            match_re = "(Input file to map %s read completely: %s)[\n\r]" % (task_id_tag, inp_path)
            if not match_and_print_log_line_re(match_re, nohup_str): sys_exit(nodetracker_log_temp, nohup_temp)
        
        match_re = r"(Actual processing on map %s ENDED)[\n\r]" % task_id_tag
        if not match_and_print_log_line_re(match_re, nohup_str): sys_exit(nodetracker_log_temp, nohup_temp)
        match_re = r"(Starting to write map %s outputs)[\n\r]" % task_id_tag
        if not match_and_print_log_line_re(match_re, nohup_str): sys_exit(nodetracker_log_temp, nohup_temp)
        match_re = r"(Written map %s output: )(.+?)[\n\r]" % task_id_tag
        output_files = [i[3][1] for i in match_and_print_log_line_re(match_re, nohup_str)] 
        if not output_files: sys_exit(nodetracker_log_temp, nohup_temp)
        match_re = r"(Written )(\d+)( outputs for map %s)[\n\r]" % task_id_tag
        out = match_and_print_log_line_re(match_re, nohup_str)
        if not out: sys_exit(nodetracker_log_temp, nohup_temp)
        no_output_files = out[0][3][1]
        
        if not len(output_files) != no_output_files:
            print("The number of output files do not match up from the last log statement")
        
        # if need to check file size
        if 'ch' in args:
            for dest_filepath in output_files:
                out_txt = read_process_output('ssh ' + BACK_END_ACCESS_USER + '@' + nodetracker_ip + " 'ls -l " + dest_filepath + "' | awk '{print $5;}'")
                if out_txt != "":
                    print("----> " + dest_filepath + " is " + str(int(out_txt)) + "b in size")
                else:
                    print("The " + dest_filepath + " does not exist at " + nodetracker_ip)
                    sys_exit(nodetracker_log_temp, nohup_temp)
        
        match_re = "(Child process %s - pid %s ended in )(.+?)[\n\r]" % (task_id_tag, process_id)
        out = match_and_print_log_line_re(match_re, nt_str)
        if not out or out[0][3][1] != "success": sys_exit(nodetracker_log_temp, nohup_temp)
        match_re = r"(NT got a notification from an %s task %s of job-id %s for completion)[\n\r]" % (constants.MAP, str(task_id), str(job_id))
        if not match_and_print_log_line_re(match_re, nt_str): sys_exit(nodetracker_log_temp, nohup_temp)
        match_re = r"(Completing Map Task %s of job-id %s)[\n\r]" % (str(task_id), str(job_id))
        if not match_and_print_log_line_re(match_re, nt_str): sys_exit(nodetracker_log_temp, nohup_temp)
        
        print("----------- Task %s apparently reached completion -----------" % task_id_tag)
        
    finally:
        sys_exit(nodetracker_log_temp, nohup_temp)
        

def debug_trace_red_task(job_id, task_id, struct_id, level_no, no_inputs, nodetracker_id, nodetracker_ip):
    try:
        nodetracker_log_temp, nohup_temp = copy_log_files(nodetracker_ip)
        
        task_id_tag = return_task_id(job_id, task_id, struct_id, constants.REDUCE)
        
        # read the nodetracker log
        fd = open(nodetracker_log_temp, 'r')
        nt_str = fd.read()
        fd.close()
        
        match_re = r'(A Reduce task %s initiated with Job Id %s)[\n\r]' % (str(task_id), str(job_id))
        if not match_and_print_log_line_re(match_re, nt_str): sys_exit(nodetracker_log_temp, nohup_temp)
        match_re = r'(ProcessRunner %s init complete)[\n\r]' % task_id_tag
        if not match_and_print_log_line_re(match_re, nt_str): sys_exit(nodetracker_log_temp, nohup_temp)
        
        print(job_id, task_id, struct_id, level_no, no_inputs, nodetracker_id, nodetracker_ip)
        raise NotImplementedError('This has still not been implemented')
    finally:
        sys_exit(nodetracker_log_temp, nohup_temp)


def debug_trace_task(task_id_breakup, args):
    print("Searching", task_id_breakup[1], "task with id", task_id_breakup[2], "of job", task_id_breakup[0])
    
    args = [i for i in args]
    
    if task_id_breakup[1] == constants.MAP:
        search_str = outfile_map_assigned_search_re.replace("\d+", "%s", 2) % (task_id_breakup[2], task_id_breakup[0])
        assigned_map_list = get_re_list_from_catout(master_log_filename, egrep_map_assigned_re, search_str)
        if len(assigned_map_list) > 0:
            job_id, task_id, struct_id, nodetracker_id, nodetracker_ip = assigned_map_task_breakup(assigned_map_list[0])
            debug_trace_map_task(job_id, task_id, struct_id, nodetracker_id, nodetracker_ip, args)
            print("A total of %d maps with this same task-id" % len(assigned_map_list))
        else:
            print("Cannot find a map with this task-id ", task_id_breakup)
    else:
        search_str = outfile_red_assigned_search_re.replace("\d+", "%s", 2) % (task_id_breakup[2], task_id_breakup[0])
        assigned_red_list = get_re_list_from_catout(master_log_filename, egrep_red_assigned_re, search_str)
        if len(assigned_red_list) > 0:
            job_id, task_id, struct_id, level_no, no_inputs, nodetracker_id, nodetracker_ip = assigned_reduce_task_breakup(assigned_red_list[0])
            debug_trace_red_task(job_id, task_id, struct_id, level_no, no_inputs, nodetracker_id, nodetracker_ip)
            print("A total of %d reduces with this same task-id" % len(assigned_red_list))
        else:
            print("Cannot find a reduce with this task-id ", task_id_breakup)
            
        
#################### FIND INCOMPLETE TASKS ####################

def usage_help():
    print("""MR+ Distributed Debugger
------------------------
Useful for pin-pointing which tasks have gone awry during the
life of a job. It also has options to diagnose each individual
task by tracing its lifespan. This command is supposed to be
run from the master node.

Usage: python distrdebug.py COMMAND
where COMMAND is one of:
  -h | --help\t\t\tdisplays this help
  -f | --findinc\t\tfinds all assigned but incomplete tasks
  -d | --debugtask <task_id>\tdebug trace task given its task_id in the format [job_id][M|R][task_id_no] e.g. 1M3, 1R205
\tOptions: ch\t\tcheck sizes of files copied
""")
    
    
if __name__ == "__main__":
    try:
        if len(sys.argv) <= 1:
            raise getopt.GetoptError('No arguments given')
        opts, args = getopt.getopt(sys.argv[1:], "hfd:", ["help", "findinc", "debugtask="])
    except getopt.GetoptError:
        print("---Invalid usage---")
        usage_help()
        sys.exit(2)
    
    # keep only the valid arguments
    args = [i for i in args if i in ['ch']]
    
    for opt, arg in opts:
        if opt in ("-h", "--help"):
            usage_help()
            sys.exit()
        elif opt in ("-f", "--findinc"):
            find_incomplete_tasks()
            sys.exit()
        elif opt in ("-d", "--debugtask"):
            task_id = arg
            task_id_breakup = re.findall('^(\d+)(['+constants.MAP+constants.REDUCE+'])(\d+)$', task_id)
            if len(task_id_breakup) == 0:
                print("---Invalid task_id---")
                usage_help()
                sys.exit(2)
            else:
                debug_trace_task(task_id_breakup[0], args)
            sys.exit()



"""
Map flow
-- ma_logger.log -> 2009-07-23 13:06:55 INFO nodetracker.startNewMapTask A Map task 0 initiated with Job Id 1
-- ma_logger.log -> 2009-07-23 16:04:33 DEBUG processrunner.__init__ ProcessRunner 1M0 init complete
-- ma_logger.log -> 2009-07-23 13:06:55 INFO maptip.run Started MapTIP for 1M0
-- ma_logger.log -> 2009-07-24 07:11:26 DEBUG maptip.localizeTask Copying map input for 1M0: /mr_temp/jobs/1/input/1M0.in
-- ma_logger.log -> 2009-07-24 07:11:26 DEBUG _hdfs.copy_file_to_local Executing HDFS command: ['/state/partition1/hadoop-0.16.3/bin/hadoop', 'fs', '-copyToLocal', '/mr_temp/jobs/1/input/1M0.in', '/state/partition1/mr_temp/1/input/']
-- ma_logger.log -> 2009-07-24 07:11:30 INFO _hdfs.copy_file_to_local Copied HDFS file /mr_temp/jobs/1/input/1M0.in to a local path /state/partition1/mr_temp/1/input/
-- ma_logger.log -> 2009-07-24 07:11:30 INFO maptip.localizeTask Copied all map inputs for 1M0: ['1M0.in']
-- ma_logger.log -> 2009-07-24 07:11:30 INFO processrunner.start_and_end_process Child process 1M0 running: 9483
    -- nohup.out -> Running MRPlusMapTaskRunner process pid 9483
    -- nohup.out -> MRMapTaskRunner received all args for 1M0
    -- nohup.out -> 2009-07-24 07:11:30 INFO taskrunner.__init__ TaskRunner inited 1M0
    -- nohup.out -> 2009-07-24 07:11:30 INFO maptaskrunner.startTask Actual processing on map 1M0 STARTED in a separate PROCESS
    -- nohup.out -> 2009-07-24 07:11:30 INFO maptaskrunner.startTask Input file to map 1M0: /asdas/asdas/sad
    -- nohup.out -> 2009-07-24 07:11:30 INFO maptaskrunner.startTask Input file to map 1M0 read completely: /asdas/asdas/sad
    -- nohup.out -> 2009-07-24 07:11:30 INFO maptaskrunner.startTask Actual processing on map 1M0 ENDED
    -- nohup.out -> 2009-07-24 07:11:30 INFO mrplusmaptaskrunner.writeOutputToLocalFile Starting to write map 1M0 outputs
    -- nohup.out -> 2009-07-24 07:11:30 INFO mrplusmaptaskrunner.writeOutputToLocalFile Written map 1M0 output: /asdas/asdas/sad
    -- nohup.out -> 2009-07-24 07:11:30 INFO mrplusmaptaskrunner.writeOutputToLocalFile Written map 1M0 output: /asdas/asdas/sa2
    -- nohup.out -> 2009-07-24 07:11:30 INFO mrplusmaptaskrunner.writeOutputToLocalFile Written 2 outputs for map 1M0
-- ma_logger.log -> 2009-07-24 07:11:30 INFO processrunner.start_and_end_process Child process 1M0 - pid 9483 ended in success
            OR
-- ma_logger.log -> 2009-07-24 07:11:30 ERROR processrunner.start_and_end_process Child process 1M0 - pid 9483 ended in failure with this return code -1
-- ma_logger.log -> 2009-07-24 07:11:30 DEBUG nodetracker.taskComplete NT got a notification from an M task 0 of job-id 1 for completion
-- ma_logger.log -> 2009-07-24 07:11:30 INFO nodetracker.taskComplete Completing Map Task 0 of job-id 1


Reduce flow
-- ma_logger.log -> 2009-07-23 13:07:26 INFO nodetracker.startNewReduceTask A Reduce task 4 initiated with Job Id 1
-- ma_logger.log -> 2009-07-23 16:04:33 DEBUG processrunner.__init__ ProcessRunner 1R4 init complete
ma_logger.log -> 2009-07-23 13:06:55 INFO reducetip.run Started ReduceTIP for 1R4 
"""